Gwork.cc

production 75e669
gwork.cc
/RabbitMQ 学习笔记/

RabbitMQ 学习笔记

面向 PHP 开发者的 RabbitMQ 核心笔记,包含基础概念、常见代码示例、死信队列、幂等处理、常见错误等。


一、RabbitMQ 是什么

RabbitMQ 是一个 消息队列系统(Message Queue),用于系统之间的 异步通信、解耦、削峰填谷

常见使用场景:

  • 异步任务(发送短信 / 邮件)
  • 订单处理
  • 日志收集
  • 秒杀系统削峰
  • 系统解耦

示意结构:

Producer -> Exchange -> Queue -> Consumer

二、RabbitMQ 核心概念

1 Producer(生产者)

发送消息的一方。

PHP 示例:

$channel->basic_publish($msg, $exchange, $routing_key);

2 Exchange(交换机)

负责 接收消息并路由到 Queue

RabbitMQ 不允许直接发送到 Queue:

Producer -> Exchange -> Queue

Exchange 类型:

| 类型 | 说明 | | ------- | -------------------- | | direct | 精确匹配 routing key | | fanout | 广播 | | topic | 通配符匹配 | | headers | header 匹配 |


3 Queue(队列)

用于存储消息,消费者从队列中读取消息。

$channel->queue_declare(
    'order.queue',
    false,
    true,
    false,
    false
);

参数说明:

| 参数 | 含义 | | ----------- | ---------- | | durable | 是否持久化 | | exclusive | 是否独占 | | auto_delete | 自动删除 |


4 Routing Key

Routing Key 用于 Exchange 和 Queue 的匹配

例如:

routing_key = order.create

三、Exchange 与 Queue 绑定

Exchange 必须绑定 Queue 才能发送消息。

$channel->queue_bind(
    'order.queue',
    'order.exchange',
    'order.create'
);

结构:

Exchange
   |
routing_key
   |
Queue

四、PHP Producer 示例

安装库:

composer require php-amqplib/php-amqplib

发送消息:

use PhpAmqpLib\Message\AMQPMessage;

$msg = new AMQPMessage('hello world');

$channel->basic_publish(
    $msg,
    'order.exchange',
    'order.create'
);

参数说明:

basic_publish(message, exchange, routing_key)

五、PHP Consumer 示例

$callback = function ($msg) {
    echo "receive: " . $msg->body . "\n";
};

$channel->basic_consume(
    'order.queue',
    '',
    false,
    true,
    false,
    false,
    $callback
);

while ($channel->is_consuming()) {
    $channel->wait();
}

六、ACK 机制

RabbitMQ 有两种确认方式。

自动确认

auto_ack = true

问题:

consumer 崩溃
消息可能丢失

手动确认(推荐)

$msg->ack();

处理流程:

consumer处理
   |
成功
   |
ack

失败:

nack / reject

七、RabbitMQ 持久化

为了防止 RabbitMQ 重启导致消息丢失,需要开启持久化。

1 Exchange 持久化

$channel->exchange_declare(
    'order.exchange',
    'direct',
    false,
    true,
    false
);

2 Queue 持久化

$channel->queue_declare(
    'order.queue',
    false,
    true,
    false,
    false
);

3 Message 持久化

$msg = new AMQPMessage(
    $data,
    ['delivery_mode' => 2]
);

八、消息重复消费(幂等问题)

RabbitMQ 在某些情况下可能会 重复投递消息

  • consumer 处理完成但未 ack
  • 网络异常
  • consumer 重启

解决方法:

1 使用唯一业务 ID

例如:

order_id
message_id

2 建立去重表

mq_message

字段示例:

| 字段 | 含义 | | ---------- | ----------- | | id | 主键 | | message_id | 消息唯一 ID | | status | 处理状态 |

消费前:

INSERT INTO mq_message(message_id)

如果插入失败说明:

消息已经处理过

九、死信队列(Dead Letter Queue)

死信队列用于处理:

  • 消费失败
  • TTL 过期
  • 队列满

死信产生的三种情况

1️⃣ 消息被拒绝

basic.reject
basic.nack

2️⃣ 消息过期

TTL

3️⃣ 队列满


死信队列配置

队列参数:

x-dead-letter-exchange
x-dead-letter-routing-key

示例:

use PhpAmqpLib\Wire\AMQPTable;

$args = new AMQPTable([
    'x-dead-letter-exchange' => 'order.dlx.exchange'
]);

死信流程

queue1
  |
消费失败
  |
dead letter
  |
DLX
  |
queue2

十、RabbitMQ 常见错误

1 AMQPTable not found

原因:

use PhpAmqpLib\Wire\AMQPTable;

没有引入类。


2 PRECONDITION_FAILED

示例错误:

inequivalent arg 'x-dead-letter-exchange'

原因:

queue 参数与已存在队列不一致

解决:

  • 删除旧 queue
  • 或保持参数一致

十一、RabbitMQ 常见问题

1 RabbitMQ 如何保证消息不丢?

需要做到三点:

  1. Exchange 持久化
  2. Queue 持久化
  3. Message 持久化

2 如何保证消息不重复消费?

解决方案:

  • 消息唯一 ID
  • 幂等设计
  • Redis / 数据库去重

3 RabbitMQ 如何处理消费失败?

常见方案:

  • 重试机制
  • 死信队列
  • 延迟队列

演示

producer.php

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;


// 1. 建立连接
$connection = new AMQPStreamConnection(
    'rabbitmq',
    5672,
    'guest',
    'guest'
);

$channel = $connection->channel();

$channel->exchange_declare(
    'order.exchange',
    'direct',
    false,
    true,
    false
);

// 2. 声明 durable queue
$queueName = 'order.queue';

$channel->queue_declare(
    $queueName,
    false,  // passive
    true,   // durable
    false,  // exclusive
    false,   // auto_delete
    false,
    new AMQPTable([
        'x-dead-letter-exchange' => 'order.dlx.exchange',
        'x-dead-letter-routing-key' => 'order.dlq'
    ])

);

// 3. 构造消息(persistent)
$data = [
    'order_id' => uniqid(),
    'amount'   => 100,
    'time'     => date('Y-m-d H:i:s')
];

$msg = new AMQPMessage(
    json_encode($data, JSON_UNESCAPED_UNICODE),
    [
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
    ]
);

$channel->queue_bind('order.queue', 'order.exchange', 'order');


// 4. 发送消息
$channel->basic_publish(
    $msg,
    'order.exchange',
    'order'
);


echo " [x] Send order: {$data['order_id']}\n";

// 5. 关闭资源
$channel->close();
$connection->close();

consumer.php

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection(
    'rabbitmq',
    5672,
    'guest',
    'guest'
);

$channel = $connection->channel();


$args = new AMQPTable([
    'x-dead-letter-exchange'    => 'order.dlx.exchange',
    'x-dead-letter-routing-key'=> 'order.dlq',
]);

$channel->queue_declare(
    'order.queue',
    false,
    true,
    false,
    false,
    false,
    $args
);

$queueName = 'order.queue';


// 一次只处理一条
$channel->basic_qos(null, 1, null);

// ================== 新增部分开始 ==================
$running = true;

// 监听 SIGTERM / SIGINT
pcntl_signal(SIGTERM, function () use (&$running) {
    echo " [!] SIGTERM received, stop consuming...\n";
    $running = false;
});

pcntl_signal(SIGINT, function () use (&$running) {
    echo " [!] SIGINT received, stop consuming...\n";
    $running = false;
});
// ================== 新增部分结束 ==================

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
    $data = json_decode($msg->body, true);
    $orderId = $data['order_id'];

    echo " [>] Processing order {$orderId}\n";

    // 模拟耗时任务
    // sleep(2);

    echo " [✓] Done {$orderId}\n";

    // 业务成功后 ACK
    $msg->ack();
};

$channel->basic_consume(
    $queueName,
    '',
    false,
    false,
    false,
    false,
    $callback
);

// ================== 核心循环修改 ==================
while ($running && $channel->is_consuming()) {
    pcntl_signal_dispatch(); // 关键
    try {
        $channel->wait(null, false, 1);
    } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
        // 空闲超时,忽略
    }
}

// 退出前关闭资源
echo " [*] Graceful shutdown\n";
$channel->close();
$connection->close();